package com.example.base;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

/**
 * Author wangJinLong
 * Date 2025/8/6 09:01
 **/
public class FlinkApp {
    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        tEnv.executeSql("CREATE TABLE transactions (\n" +
                "    featuresId  STRING,\n" +
                "    siteSn      STRING,\n" +
                "    captureTime BIGINT\n" +
                ") WITH (\n" +
                "    'connector' = 'kafka',\n" +
                "    'topic'     = 'data-user',\n" +
                "    'properties.bootstrap.servers' = '106.54.174.109:19092',\n" +
                "    'value.format' = 'json',\n" +
                "    'group.id' = 'group1'\n" +
                ")");

        tEnv.executeSql("select * from transactions").print();

    }



}
